/**
 * Copyright (c) 2023 murenchao
 * taomu is licensed under Mulan PubL v2.
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
 * You may obtain a copy of Mulan PubL v2 at:
 *       http://license.coscl.org.cn/MulanPubL-2.0
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PubL v2 for more details.
 */
package cool.taomu.mqtt.broker

import cool.taomu.mqtt.broker.entity.MqttBrokerEntity
import io.netty.bootstrap.ServerBootstrap
import io.netty.channel.ChannelInitializer
import io.netty.channel.ChannelOption
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.NioServerSocketChannel
import io.netty.handler.codec.mqtt.MqttDecoder
import io.netty.handler.codec.mqtt.MqttEncoder
import io.netty.handler.timeout.IdleStateHandler
import org.slf4j.LoggerFactory

class MQTTBroker {
	val static LOG = LoggerFactory.getLogger(MQTTBroker);
	var NioEventLoopGroup selectGroup;
	var NioEventLoopGroup ioGroup;
	var MqttBrokerEntity config;

	val coreNumber = Runtime.getRuntime().availableProcessors;

	new(MqttBrokerEntity config) {
		this.selectGroup = new NioEventLoopGroup(coreNumber);
		this.ioGroup = new NioEventLoopGroup(coreNumber * 2);
		this.config = config;
	}

	def startTcpServer() {
		var serverBootstrap = new ServerBootstrap();
		val bootstrap = serverBootstrap.group(selectGroup, ioGroup);
		var channel = bootstrap.channel(NioServerSocketChannel)
		channel.option(ChannelOption.SO_BACKLOG, 1024).childOption(ChannelOption.TCP_NODELAY, false).option(
			ChannelOption.SO_REUSEADDR, true).childOption(ChannelOption.SO_KEEPALIVE, false);
		channel.childHandler(new ChannelInitializer<SocketChannel>() {
			override protected initChannel(SocketChannel ch) throws Exception {
				var pipeline = ch.pipeline();
				if (config.useSsl) {
					pipeline.addLast("ssl", MQTTSslHandler.build(ch, config.ssl));
				}
				pipeline.addLast("idleStateHandler", new IdleStateHandler(60, 0, 0));
				pipeline.addLast("mqttEncoder", MqttEncoder.INSTANCE);
				pipeline.addLast("mqttDecoder", new MqttDecoder(Integer.MAX_VALUE));
				pipeline.addLast("nettyMqttHandler", new MQTTHandler());
			}
		})
		Runtime.runtime.addShutdownHook(new Thread() {
			override run() {
				selectGroup.shutdownGracefully();
				ioGroup.shutdownGracefully();
			}
		});
		LOG.info("启动MQTT代理服务 ip: {} port: {}", config.hostname, config.port);
		bootstrap.bind(config.hostname, config.port).sync();
	}
}
